-
Notifications
You must be signed in to change notification settings - Fork 0
Clone kafka 18894 #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Your organization has reached the review limit (30/30 reviews used). To continue using Refacto code reviews, please upgrade your subscription. |
WalkthroughReplaces direct ConfigProvider usage with Plugin-wrapped providers across clients and Connect. Updates constructors, method signatures, and control flow to pass Plugin into ConfigTransformer and lifecycle management to close plugins. Adds metrics wiring via Plugins.newConfigProvider, test coverage for monitorable providers, and minor docs/comment/style tweaks. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as Application
participant AC as AbstractConfig
participant ICP as instantiateConfigProviders
participant CT as ConfigTransformer
participant P as Plugin<ConfigProvider>
participant CP as ConfigProvider
App->>AC: resolveConfigVariables(originals)
AC->>ICP: instantiateConfigProviders(...)
ICP-->>AC: Map<String, Plugin<ConfigProvider>> (providerPlugins)
alt providerPlugins is empty
AC-->>App: originals (no transformation)
else providerPlugins present
AC->>CT: new ConfigTransformer(providerPlugins)
loop each variable
CT->>P: get()
P-->>CT: CP
CT->>CP: get(path, keys)
CP-->>CT: ConfigData
end
CT-->>AC: transformed configs + TTLs
AC-->>App: ResolvingMap (merged)
end
note over AC,P: On shutdown, close provider plugins
sequenceDiagram
autonumber
actor W as Worker
participant Pl as Plugins
participant M as Metrics
participant L as PluginLoader
participant P as Plugin<ConfigProvider>
participant CP as ConfigProvider
participant WCT as WorkerConfigTransformer
W->>Pl: newConfigProvider(config, providerName, usage, metrics)
Pl->>L: load provider class for providerName
L-->>Pl: ConfigProvider instance
Pl->>CP: configure(config subset)
Pl->>P: Plugin.wrapInstance(CP, metrics, baseTag, {provider: name})
P-->>Pl: Plugin<ConfigProvider>
Pl-->>W: Plugin<ConfigProvider>
W->>WCT: new WorkerConfigTransformer(worker, {name: P})
WCT->>P: pass to ConfigTransformer(...)
note over P,CP: Metrics tags applied (config/class/provider)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (3 warnings)
✨ Finishing touches
🧪 Generate unit tests
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.39.5)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.javaThanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant architectural enhancement by wrapping ConfigProvider instances in a generic Plugin object. This change standardizes how configuration providers are loaded, configured, and managed throughout Kafka, particularly within Kafka Connect. A key benefit is the enablement of metrics registration for ConfigProvider's, allowing for improved monitoring and operational visibility. The modifications involve updating core configuration handling, plugin loading mechanisms, and associated test cases to ensure robust and observable configuration management. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism to expose metrics from ConfigProvider implementations by wrapping them in a Plugin object. This is a valuable addition for monitoring and observability. The changes are well-implemented, following existing patterns within the Kafka codebase. The refactoring is applied consistently across AbstractConfig, ConfigTransformer, MirrorMakerConfig, and Worker. The Worker class is correctly updated to wire in the metrics registry, enabling this feature in Kafka Connect. The new tests are comprehensive, covering both the metrics registration and the plugin lifecycle, ensuring the correctness of the implementation. I have reviewed the changes and found no issues of medium or higher severity. The code quality is excellent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)
273-282: Guard against null newConfigProvider results before inserting into the mapPlugins.newConfigProvider(...) may return null when a provider name lacks a corresponding ".class" config. Avoid inserting null entries and optionally log to aid debugging.
- for (String name : providerNames) { - Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( + for (String name : providerNames) { + Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( this, name, Plugins.ClassLoaderUsage.PLUGINS, null ); - providerPlugins.put(name, configProviderPlugin); + if (configProviderPlugin != null) { + providerPlugins.put(name, configProviderPlugin); + } else { + // optionally: log at debug/warn that provider 'name' is not configured + } }connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)
2904-2951: Start/stop Worker to exercise provider creation path and avoid flakinessCurrently the test does not call worker.start(); if provider instantiation moves to start(), this test may stop validating metrics. Start and stop the worker for stability.
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.start(); Metrics metrics = worker.metrics().metrics(); assertMetrics(metrics, 1, expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable"))); assertMetrics(metrics, 1, expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2"))); + worker.stop();
2937-2951: Relax tag equality to tolerate default/global metric context tagsMetricName.tags() may include global tags (e.g., from MetricsConfig). Comparing equality can make the test brittle. Check that expected tags are a subset.
- for (MetricName metricName : metrics.metrics().keySet()) { + for (MetricName metricName : metrics.metrics().keySet()) { if (metricName.group().equals("plugins")) { Map<String, String> tags = metricName.tags(); - if (expectedTags.equals(tags)) { + if (tags.entrySet().containsAll(expectedTags.entrySet())) { assertEquals(MonitorableConfigProvider.NAME, metricName.name()); assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description()); found++; } } }clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (2)
98-105: Minor: simplify/clarify null checkskeysByPath cannot be null here; only providerPlugin needs a null check. Optional tidy-up:
- Map<String, Set<String>> keysByPath = entry.getValue(); - if (providerPlugin != null && keysByPath != null) { + Map<String, Set<String>> keysByPath = entry.getValue(); + if (providerPlugin != null) {
95-113: TTL map may collide across providers with identical pathsttls.put(path, ttl) can overwrite when multiple providers use the same path string. If downstream consumers rely on per-provider TTLs, consider scoping by provider+path.
- Map<String, Long> ttls = new HashMap<>(); + Map<String, Long> ttls = new HashMap<>(); for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) { String providerName = entry.getKey(); Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName); Map<String, Set<String>> keysByPath = entry.getValue(); if (providerPlugin != null && keysByPath != null) { for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) { String path = pathWithKeys.getKey(); Set<String> keys = new HashSet<>(pathWithKeys.getValue()); ConfigData configData = providerPlugin.get().get(path, keys); @@ - if (ttl != null && ttl >= 0) { - ttls.put(path, ttl); - } + if (ttl != null && ttl >= 0) { + // e.g., use providerName + ":" + path as key if appropriate for consumers of ConfigTransformerResult + ttls.put(path, ttl); + }Note: apply only if ConfigTransformerResult consumers can handle disambiguated TTL keys.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
Vagrantfile(1 hunks)clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java(4 hunks)clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java(3 hunks)clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java(1 hunks)clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java(5 hunks)clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java(1 hunks)connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java(2 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java(1 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java(3 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java(3 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics(6299-6301)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
WorkerConfig(58-463)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils(93-1700)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils(93-1700)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
Plugins(59-696)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)
🔇 Additional comments (30)
Vagrantfile (1)
21-21: Comment adds useful guardrail.Clear warning helps future edits avoid accidental API version changes.
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (4)
20-20: LGTM! Plugin import added for wrapper support.The import enables Plugin.wrapInstance usage in test setup, aligning with the plugin-based provider lifecycle management.
49-49: LGTM! Test setup correctly uses Plugin wrapper.The test provider is now wrapped with
Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"), matching the production code's plugin-based approach. Passingnullfor metrics is appropriate in tests unless metric behavior is being validated.
54-59: Good addition of TTL assertion.The
assertTrue(ttls.isEmpty())assertion on line 58 validates that basic variable replacement doesn't produce TTL values. This strengthens test coverage.
63-68: LGTM! Consistent modernization to Map.of.Replacing
Collections.singletonMapwithMap.ofthroughout the test methods improves readability and aligns with modern Java practices.Also applies to: 72-75, 79-82, 86-89, 93-98, 102-105
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)
210-223: LGTM! Provider initialization migrated to plugin-based approach.The
initConfigTransformermethod correctly:
- Creates a
Map<String, Plugin<ConfigProvider>>for plugin-wrapped providers (line 212)- Calls
plugins.newConfigProviderwith metrics support viametrics.metrics()(lines 214-219)- Passes the plugin map to
WorkerConfigTransformer(line 222)This enables proper lifecycle management and metrics wiring for config providers.
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)
32-35: LGTM! Clear documentation for metrics support.The added JavaDoc accurately describes how ConfigProvider implementations can enable metrics by implementing
Monitorable, and specifies the automatic tags (config,class,provider) that will be added to all registered metrics.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (4)
21-21: LGTM! Plugin import added for test setup.Enables Plugin.wrapInstance usage in the test configuration.
69-69: LGTM! Test setup correctly uses Plugin wrapper.The test provider is wrapped with
Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"), matching the production code changes. Passingnullfor metrics is appropriate for this test scenario.
75-75: LGTM! Consistent modernization to Map.of.Test method inputs updated from verbose map constructions to clean
Map.ofsyntax.Also applies to: 100-100, 115-115, 122-122
150-157: LGTM! TestConfigProvider updated with Map.of.The mock provider now returns
ConfigDatainstances usingMap.offor cleaner test data construction. This aligns with the modernization throughout the test suite.connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3)
23-23: LGTM! Plugin import added.Required for the plugin-based provider lifecycle management.
46-52: LGTM! Constructor correctly migrated to plugin-based providers.The changes properly:
- Update the field type to
Map<String, Plugin<ConfigProvider>>(line 46)- Accept plugin-wrapped providers in the constructor (line 48)
- Store the plugin map (line 50)
- Pass the plugin map to ConfigTransformer (line 51)
This enables proper lifecycle management and metrics support for config providers.
100-102: LGTM! Plugin cleanup correctly implemented.The
close()method now closes plugin instances instead of raw providers. SincePlugin.close()handles both the wrapped ConfigProvider and any associated PluginMetrics, this ensures proper resource cleanup.connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)
275-280: Confirm classloader usage is intentionalUsing Plugins.ClassLoaderUsage.PLUGINS here changes resolution compared to CURRENT_CLASSLOADER. Please confirm this matches MirrorMaker’s expected isolation behavior. If not, switch to CURRENT_CLASSLOADER.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)
382-390: Good migration to Plugin-wrapped provider in test setupWrapping MockFileConfigProvider with Plugin and stubbing the new signature keeps tests aligned with runtime changes. LGTM.
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
60-69: Migration to Plugin looks correctConstructor and field updates are consistent; usage via providerPlugin.get() is appropriate.
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
29-58: LGTM! Well-designed test helper for metrics wiring.The class correctly implements both ConfigProvider and Monitorable interfaces with appropriate stub methods for testing. The
protectedvisibility of theconfiguredfield enables subclass verification patterns (used inPluginsTest.CustomMonitorableConfigProvider).connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3)
22-23: LGTM! Required imports for Plugin-based metrics wiring.
632-632: LGTM! API updated to support Plugin-wrapped providers with metrics.The signature changes align with the broader refactoring:
- Return type
Plugin<ConfigProvider>enables automatic lifecycle management and metrics wiringproviderNameparameter is clearer than the previousproviderPrefixMetricsparameter enables metrics forMonitorableproviders
633-649: LGTM! Correct Plugin wrapping with appropriate tags.The implementation correctly:
- Derives
providerPrefixfromproviderName(line 633)- Wraps the provider with
Plugin.wrapInstanceincluding metrics and proper tags (line 649)- Tags will be:
{"config": "config.providers", "class": "<ClassName>", "provider": "<providerName>"}- Automatic
withPluginMetricsinvocation forMonitorableprovidersconnect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4)
25-28: LGTM! Required imports for Plugin-based testing.
380-402: LGTM! Test correctly updated for Plugin-wrapped API.The test properly exercises the new signature:
- Uses
providerNameparameter (clearer thanproviderPrefix)- Accesses provider via
plugin.get()- Validates plugin classloader behavior
404-412: LGTM! Test validates metrics wiring order.The test correctly verifies that:
MonitorableConfigProviderinstances can be wrapped with metrics- The provider's
withPluginMetricsis invoked by the Plugin wrapperThe test implicitly validates the call order through
CustomMonitorableConfigProvider.withPluginMetrics(), which asserts thatconfigure()was called first (lines 813-815).
810-816: LGTM! Test helper validates configure-before-metrics contract.The assertion
assertTrue(configured)(line 814) correctly validates thatconfigure()is called beforewithPluginMetrics(), which is the expected order in thePlugins.newConfigProvider()implementation (configure at lines 646-648, wrap at line 649).clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (5)
23-23: LGTM: Import added for Plugin wrapper.The import is necessary to support wrapping
ConfigProviderinstances withPluginfor lifecycle and metrics management.
550-558: LGTM: Variable transformation adapts to plugin-wrapped providers.The refactor correctly updates the variable name and type to
Map<String, Plugin<ConfigProvider>>, and passes the plugin map toConfigTransformer. The control flow and logic remain sound.
559-559: LGTM: Closing logic correctly handles plugin lifecycle.The code properly closes each
Plugin<ConfigProvider>instance, which will clean up both the wrapped provider (ifAutoCloseable) and any associated plugin metrics.
598-602: LGTM: Method signature updated to return plugin-wrapped providers.The return type correctly reflects the new pattern of wrapping
ConfigProviderinstances inPluginfor lifecycle and metrics management.
624-639: Remove metrics verification in AbstractConfig.wrapInstance
Passingnullfor theMetricsparameter inPlugin.wrapInstance(provider, null, …)is intentional and consistent—metrics are injected at higher layers viaPlugins.newConfigProvider. No changes needed.Likely an incorrect or invalid review comment.
|
/refacto-bot |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Multi-Domain Review: Java Compatibility👍 Well Done
📌 Files Processed
|
| @Override | ||
| public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) { | ||
| ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<>() { | ||
| ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java 8 Compatibility
Diamond operator without explicit type parameters is not compatible with Java 8. This syntax requires Java 9+, but the codebase appears to target Java 8 compatibility.
Standards
- Java 8 Compatibility
- Language Version Compliance
| }) | ||
| .sorted(Comparator.comparing(docInfo -> docInfo.predicateName)) | ||
| .toList(); | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java 8 Stream API
The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.
Standards
- Java 8 Compatibility
- Stream API Compatibility
| public class TransformationDoc { | ||
|
|
||
| private record DocInfo(String transformationName, String overview, ConfigDef configDef) { | ||
| private static final class DocInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Record Replacement
Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. Using records will cause compilation failures in Java 8 environments.
Standards
- Java 8 Compatibility
- Language Feature Compatibility
| * the connector. | ||
| */ | ||
| public record ConnectorTaskId(String connector, int task) implements Serializable, Comparable<ConnectorTaskId> { | ||
| public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Record Usage
Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.
Standards
- Java 8 Compatibility
- Language Feature Compatibility
| } | ||
|
|
||
| private record TestDTO(String content) { | ||
| private static class TestDTO { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Record Usage
Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.
Standards
- Java 8 Compatibility
- Language Feature Compatibility
| package org.apache.kafka.connect.integration; | ||
|
|
||
| public record StartsAndStops(int starts, int stops) { | ||
| public class StartsAndStops { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Record Usage
Java records were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.
Standards
- Java 8 Compatibility
- Language Feature Compatibility
| switch (supportLevel) { | ||
| case EXACTLY_ONCE_SUPPORTED: | ||
| return ExactlyOnceSupport.SUPPORTED; | ||
| case EXACTLY_ONCE_UNSUPPORTED: | ||
| return ExactlyOnceSupport.UNSUPPORTED; | ||
| case EXACTLY_ONCE_FAIL: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switch Expression Usage
Switch expressions were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.
Standards
- Java 8 Compatibility
- Language Feature Compatibility
| switch (supportLevel) { | ||
| case TRANSACTION_BOUNDARIES_SUPPORTED: | ||
| return ConnectorTransactionBoundaries.SUPPORTED; | ||
| case TRANSACTION_BOUNDARIES_FAIL: | ||
| throw new ConnectException("oh no :("); | ||
| case TRANSACTION_BOUNDARIES_NULL: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switch Expression Usage
Switch expressions were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.
Standards
- Java 8 Compatibility
- Language Feature Compatibility
| "simulated-task-producer-" + CONNECTOR_NAME + "-" + i, | ||
| Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i) | ||
| )).toList(); | ||
| )).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream toList Usage
The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.
Standards
- Java 8 Compatibility
- Stream API Compatibility
| private static String formatAssignment(Map<String, Collection<String>> assignment) { | ||
| StringBuilder result = new StringBuilder(); | ||
| for (String worker : assignment.keySet().stream().sorted().toList()) { | ||
| for (String worker : assignment.keySet().stream().sorted().collect(Collectors.toList())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream toList Usage
The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.
Standards
- Java 8 Compatibility
- Stream API Compatibility
| .map(ConfigValueInfo::errors) | ||
| .flatMap(Collection::stream) | ||
| .toList(); | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream toList Usage
The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.
Standards
- Java 8 Compatibility
- Stream API Compatibility
| String expectedType; | ||
| switch (connectorType) { | ||
| case SINK: | ||
| expectedType = "sink"; | ||
| break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switch Expression Usage
Switch expressions were introduced in Java 14, but the codebase appears to target Java 8 compatibility. This will cause compilation failures in Java 8 environments.
Standards
- Java 8 Compatibility
- Language Feature Compatibility
| .sorted(Comparator.reverseOrder()) | ||
| .map(Path::toFile) | ||
| .toList(); | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream toList Usage
The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.
Standards
- Java 8 Compatibility
- Stream API Compatibility
| .filter(path -> !path.toFile().getName().endsWith(".java")) | ||
| .filter(path -> !removeRuntimeClasses.test(path.toFile().getName())) | ||
| .toList(); | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream toList Usage
The toList() method on Stream is a Java 16+ feature. Using this in a codebase targeting Java 8 compatibility will cause compilation failures.
Standards
- Java 8 Compatibility
- Stream API Compatibility
| "\"config\": {}" + | ||
| "}"; | ||
| StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8); | ||
| StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringEntity Charset
The StringEntity constructor with Charset parameter was introduced in HttpClient 4.3+. For compatibility with older versions, the charset name string should be used instead.
Standards
- API Compatibility
- HttpClient Compatibility
| private String executePut(URI serverUrl, String endpoint, String jsonBody) throws IOException { | ||
| HttpPut request = new HttpPut(endpoint); | ||
| StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8); | ||
| StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringEntity Charset
The StringEntity constructor with Charset parameter was introduced in HttpClient 4.3+. For compatibility with older versions, the charset name string should be used instead.
Standards
- API Compatibility
- HttpClient Compatibility
| ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); | ||
|
|
||
| assertEquals(ConnectorType.SOURCE, herder.connectorType(config)); | ||
| assertEquals(herder.connectorType(config), ConnectorType.SOURCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertion Order
The assertEquals method parameters are in the wrong order. The expected value should be the first parameter, and the actual value should be the second parameter. This can lead to confusing error messages when tests fail.
Standards
- JUnit Best Practices
- Test Readability
|
|
||
| ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); | ||
| assertEquals(ConnectorType.SOURCE, herder.connectorType(config)); | ||
| assertEquals(herder.connectorType(config), ConnectorType.SOURCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertion Order
The assertEquals method parameters are in the wrong order. The expected value should be the first parameter, and the actual value should be the second parameter. This can lead to confusing error messages when tests fail.
Standards
- JUnit Best Practices
- Test Readability
|
|
||
| ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); | ||
| assertEquals(ConnectorType.SOURCE, herder.connectorType(config)); | ||
| assertEquals(herder.connectorType(config), ConnectorType.SOURCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertion Order
The assertEquals method parameters are in the wrong order. The expected value should be the first parameter, and the actual value should be the second parameter. This can lead to confusing error messages when tests fail.
Standards
- JUnit Best Practices
- Test Readability
|
The user @DDShantanuBadmanji does not have an active seat allocated for code reviews. Please contact your organization admin to assign a seat or upgrade your subscription. |
Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.
Summary by CodeRabbit
New Features
Documentation
Refactor
Tests
Chores